package com.chenjl.trace.transport.extflume;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.chenjl.trace.model.Annotation;
import com.chenjl.trace.model.BinaryAnnotation;
import com.chenjl.trace.model.Endpoint;
import com.chenjl.trace.model.Span;
import com.chenjl.trace.protobuf.TraceProtobuf.ProtoAnnotation;
import com.chenjl.trace.protobuf.TraceProtobuf.ProtoBinaryAnnotation;
import com.chenjl.trace.protobuf.TraceProtobuf.ProtoEndpoint;
import com.chenjl.trace.protobuf.TraceProtobuf.ProtoSpan;
/**
 * 读取channel的数据，写入到kafka
 * 2018-10-17 19:02:29
 * @author chenjinlong
 */
public class KafkaSink extends AbstractSink implements Configurable {
	private final Logger log = LoggerFactory.getLogger(this.getClass());
	
	private AtomicLong atomicLong = new AtomicLong(1);
	
	private String bootstrapServers = null;
	private String topicName = null;
	private Producer<String,byte[]> producer = null;
	private String domainName = null;
	private String ip = null;
	
	
	@Override
	public void configure(Context context) {
		this.bootstrapServers = context.getString("bootstrap.servers");
		this.topicName = context.getString("topic.name");
		this.domainName = context.getString("domain.name");
		
		log.info("KafkaSink读取配置-->>---bootstrapServers : {}, topicName:{} ,domainName : {}",bootstrapServers,topicName,domainName);
		
		try {
			this.ip = InetAddress.getLocalHost().getHostAddress();
		} 
		catch (UnknownHostException e) {
			log.error("读取本机ip发生错误: {}",e);
			throw new RuntimeException(e);
		}
		
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ByteArraySerializer.class);
		producer = new KafkaProducer<String,byte[]>(props);
	}
	@Override
	public synchronized void start() {
		super.start();
		
		log.info("KafkaSink启动-->>---");
	}
	@Override
	public synchronized void stop() {
		super.stop();
		
		log.info("KafkaSink关闭-->>---");
		this.producer.close();
	}
	@Override
	public Status process() throws EventDeliveryException {
		Status status = Status.READY;
        Channel channel = super.getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        
        try {	
        	Event event = channel.take();
        	if(event != null) {
        		String content = new String(event.getBody(),"UTF-8");
        		this.send(content);
        	}
        	else {
        		status = Status.BACKOFF;
        	}
        	
        	//事务开启后必须 commit or rollback
        	transaction.commit();
        }
        catch(Exception e) {
        	log.error("KafkaSink process-->>---,出现未知的异常 :{}",e);
            transaction.rollback();
            status = Status.BACKOFF;
        }
        finally {
            transaction.close();
        }
        
        log.info("KafkaSink process执行结束-->>---, 状态 : {}",status);
		return status;
	}
	/**
	 * 解析json后转换protobuf，发送到kafka
	 * @param content
	 */
	private final void send(String content) {
		SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		String dateStr = simpleDateFormat.format(new Date());
				
		List<Span> spans = JSONObject.parseArray(content,Span.class);
		for(Span span : spans) {
			String messageId = ip+"#"+dateStr+"#"+atomicLong.getAndIncrement();
			
			ProtoSpan.Builder protoSpanBuilder = ProtoSpan.newBuilder();
			protoSpanBuilder.setTraceId(span.getTraceId());
			protoSpanBuilder.setParentId(span.getParentId());
			protoSpanBuilder.setId(span.getId());
			protoSpanBuilder.setName(span.getName());
			protoSpanBuilder.setSpanType(span.getSpanType());
			protoSpanBuilder.setTimestamp(span.getTimestamp());
			protoSpanBuilder.setDuration(span.getDuration());
			protoSpanBuilder.setDomainName(domainName);
			protoSpanBuilder.setIp(ip);
			
			for(Annotation annotation : span.getAnnotations()) {
				//事件类型
				Endpoint endpoint = annotation.getEndpoint();
				
				ProtoEndpoint.Builder protoEndpointBuilder = ProtoEndpoint.newBuilder();
				protoEndpointBuilder.setServiceName(endpoint.getServiceName());
    			protoEndpointBuilder.setHost(endpoint.getHost());
    			if(endpoint.getPort() != null) {
    				protoEndpointBuilder.setPort(endpoint.getPort());
    			}
    			
    			ProtoAnnotation.Builder protoAnnotationBuilder  = ProtoAnnotation.newBuilder();
    			protoAnnotationBuilder.setEndpoint(protoEndpointBuilder.build());
    			protoAnnotationBuilder.setTimestamp(annotation.getTimestamp());
    			protoAnnotationBuilder.setValue(annotation.getValue());
    			
    			protoSpanBuilder.addAnnotations(protoAnnotationBuilder.build());
			}
			
			for(BinaryAnnotation binaryAnnotation : span.getBinaryAnnotations()) {
				//RPC标注
				Endpoint endpoint = binaryAnnotation.getEndpoint();
				
				ProtoEndpoint.Builder protoEndpointBuilder = ProtoEndpoint.newBuilder();
				protoEndpointBuilder.setServiceName(endpoint.getServiceName());
    			protoEndpointBuilder.setHost(endpoint.getHost());
    			if(endpoint.getPort() != null) {
    				protoEndpointBuilder.setPort(endpoint.getPort());
    			}
    			
    			ProtoBinaryAnnotation.Builder protoBinaryAnnotationBuilder = ProtoBinaryAnnotation.newBuilder();
    			protoBinaryAnnotationBuilder.setEndpoint(protoEndpointBuilder.build());
    			protoBinaryAnnotationBuilder.setKey(binaryAnnotation.getKey());
    			protoBinaryAnnotationBuilder.setValue(binaryAnnotation.getValue());
    			
    			protoSpanBuilder.addBinaryAnnotations(protoBinaryAnnotationBuilder.build());
			}
			
			ProtoSpan protoSpan = protoSpanBuilder.build();
			
			ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<String, byte[]>(topicName,messageId,protoSpan.toByteArray());
			producer.send(producerRecord);
		}
		
		log.info("KafkaSink发送消息完毕-->>---, 发送消息数量 : {}",spans.size());
	}
}